package drds.plus.datasource.connection_restrict;


import drds.tools.$;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayInputStream;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 平滑过渡器。(这个是针对是否可用的大门)
 * 解决不可用到可用时，瞬间的流量、连接等的暴增冲击问题
 * 第一批只允许执行1次，第二批允许执行2次，。。。最后一批之后不再限制(limits)
 * 每一批的名额用尽，要间隔timeDelay毫秒后，才开始下一批名额的发放。 在这个间隔时间期间的请求直接拒绝。
 * 允许负数的limit，含义为1/n的概率去执行。具体见构造函数
 * <pre>
 * 使用列子：
 */
@Slf4j
public class SmoothTransitionDevice {

    private final AtomicInteger count = new AtomicInteger();
    private final AtomicInteger batchNo = new AtomicInteger();
    private final int[] limits;
    private final AtomicInteger rejectCount = new AtomicInteger();
    private final long delay; // ms
    private volatile boolean available = true;
    private volatile boolean inSmooth = false; // 是否在平滑期
    private volatile long timeBegin;

    public SmoothTransitionDevice(long delay) {
        this.delay = delay;
        this.limits = new int[]{1, 2, 4, 8, 16, 32, 64};
    }

    /**
     * @param delay
     * @param limits 允许负数的limit，为了解决cliet数量相当多时，每个client放一个请求进来都会冲垮服务器的场景：
     *               >1，表示允许通过的次数 <br/>
     *               -1和0作用相同，表示放弃当前尝试，只是多延迟了一些时间。所以一般不设 <br/>
     *               -2表示这批尝试中，只随机放入所有client的1/2(让当前操作1/2的概率通过) <br/>
     *               -3表示这批尝试中，只随机放入所有client的1/3(让当前操作1/3的概率通过) ... <br/>
     *               例如：limits = new int[] { -4,-3,-2, 1, 2, 4, 8, 16, 32};
     */
    public SmoothTransitionDevice(long delay, int[] limits) {
        this.delay = delay;
        this.limits = limits;
    }

    public static SmoothTransitionDevice parse(String string) {
        Properties properties = new Properties();
        try {
            properties.load(new ByteArrayInputStream((string).getBytes()));
        } catch (Exception e) {
            log.error("解析平滑过渡器配置失败", $.printStackTraceToString(e));
            throw new RuntimeException(e);
        }
        long delay = 0;
        String[] limits = null;
        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
            String key = ((String) entry.getKey()).trim();
            String value = ((String) entry.getValue()).trim();
            switch (SmoothTransitionDeviceProperties.valueOf(key)) {
                case delay:
                    delay = Integer.parseInt(value);
                    break;
                case limits:
                    limits = value.split("\\|");
                    break;
                default:
                    break;
            }
        }
        if (delay == 0) {
            log.error("SmoothTransitionDevice Properties incomplete");
            return null;
        }
        if (limits != null) {
            int[] limitArray = new int[limits.length];
            for (int i = 0; i < limits.length; i++) {
                limitArray[i] = Integer.parseInt(limits[i].trim());
            }
            return new SmoothTransitionDevice(delay, limitArray);
        } else {
            return new SmoothTransitionDevice(delay);
        }
    }

    /**
     * 设置为可用
     */
    public void setAvailable() {
        if (available) {
            return;
        }
        //
        count.set(0);
        batchNo.set(0);
        this.inSmooth = true;
        this.available = true;
    }

    /**
     * 设置为不可用
     */
    public void setNotAvailable() {
        if (available) {
            rejectCount.set(0);
            this.available = false;
        }
    }

    /**
     * @return 返回实际可用不可用的信息
     */
    public boolean isAvailable() {
        return available;
    }

    public boolean isNotAvailable() {
        return !available;
    }

    /**
     * @return 返回平滑控制后的可用不可用信息 true: 可用； false：不可用，或者可用但没通过限流
     */
    public boolean checkSmoothThrough() {
        if (delay == 0) { // 相当于完全关闭这个功能
            return true;
        }
        while (available && inSmooth) { // 已可用，且在平滑期
            int batch = batchNo.get();
            if (batch >= limits.length) {
                inSmooth = false;//不需要再判断了
                count.set(0);
                batchNo.set(0);
                rejectCount.set(0);
                return available; // 返回可用
            }

            int limit = limits[batch];
            if (limit < -1) { // 0和-1不管，后面会直接放弃
                int randomInt = new Random().nextInt(-limit);//1/n
                if (randomInt == 0) {
                    return available; // 返回可用
                } else {
                    logReject(rejectCount, limit);
                    return false; // 返回不可用
                }
            }
            int current = count.get();
            while (current < limit) {
                if (count.compareAndSet(current, current + 1)) {
                    timeBegin = System.currentTimeMillis();
                    return available; // 返回可用
                }
                current = count.get();
            }
            // current >= limit
            if (System.currentTimeMillis() - timeBegin > delay) {
                // delay * (batch + 1) 这样做可能会人为造成聚集暴增  超过时间，才设置下一个批次
                batchNo.compareAndSet(batch, batch + 1);
                // 继续循环
            } else {
                logReject(rejectCount, limit);
                return false; // 返回不可用
            }
        }
        return available;
    }

    private void logReject(AtomicInteger rejectCount, int limit) {
        log.warn("A request reject in available switch. limit=" + limit + ",rejectCount=" + rejectCount.incrementAndGet());
    }


    public String toString() {
        return new StringBuilder("delay=").append(delay).append(",limits=").append(Arrays.toString(limits)).toString();
    }

}
